Spark 快速开始
配置
要在Spark中使用LakeSoul,请首先配置Spark Catalog。LakeSoul使用Apache Spark的DataSourceV2 API来实现数据源和目录。此外,LakeSoul还提供了 Scala 的表API,以扩展LakeSoul数据表的功能。
Spark 3 Support Matrix
LakeSoul | Spark Version |
---|---|
2.2.x-2.4.x | 3.3.x |
2.0.x-2.1.x | 3.1.x |
Spark Shell/SQL/PySpark
使用LakeSoulSparkSessionExtension
sql扩展来运行spark-shell/spark-sql/pyspark。
- Spark SQL
- Scala
- PySpark
spark-sql --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --jars lakesoul-spark-spark-3.3-2.6.0.jar
spark-shell --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --jars lakesoul-spark-spark-3.3-2.6.0.jar
wget https://github.com/lakesoul-io/LakeSoul/tree/main/python/lakesoul/spark/tables.py
pyspark --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --jars lakesoul-spark-spark-3.3-2.6.0.jar --py-files tables.py
Maven 项目依赖配置
<dependency>
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul-spark</artifactId>
<version>3.3-2.6.0</version>
</dependency>
- Scala
// Scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.lakesoul.LakeSoulOptions
import spark.implicits._
import com.dmetasoul.lakesoul.tables.LakeSoulTable
val builder = SparkSession.builder()
.master("local")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
.config("spark.sql.defaultCatalog", "lakesoul")
val spark = builder.getOrCreate()
创建命名空间
首先,为LakeSoul表创建一个namespace,如果不创建将使用默认的namespace,LakeSoul Catalog的默认namespace是default
。
- Spark SQL
- Scala
- PySpark
CREATE NAMESPACE IF NOT EXISTS lakesoul_namespace;
USE lakesoul_namespace;
SHOW TABLES;
// Scala
spark.sql("CREATE NAMESPACE IF NOT EXISTS lakesoul_namespace")
spark.sql("USE lakesoul_namespace")
spark.sql("SHOW TABLES")
// python
spark.sql("CREATE NAMESPACE IF NOT EXISTS lakesoul_namespace")
spark.sql("USE lakesoul_namespace")
spark.sql("SHOW TABLES")
创建表
使用USING lakesoul
的子句创建一个分区的LakeSoul 表,或使用DataFrameWriterV2
API,第一次写入时自动创建相应的LakeSoul表。
- Spark SQL
- Scala
- PySpark
CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING)
USING lakesoul
PARTITIONED BY (`date`)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table';
// Scala
spark.sql("CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING) USING lakesoul PARTITIONED BY (`date`) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table'")
// python
spark.sql("CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING) USING lakesoul PARTITIONED BY (`date`) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table'")
主键表
在LakeSoul中,带有主键的表被定义为哈希分区表。使用USING lakesoul子句,并结合TBLPROPERTIES设置(其中'hashPartitions'指定以逗号分隔的主键列表,'hashBucketNum'指定哈希桶的大小),可以创建一个哈希分区的LakeSoul表。
- Spark SQL
- Scala
- PySpark
CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, date STRING)
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table'
TBLPROPERTIES ( 'hashPartitions'='id', 'hashBucketNum'='2');
// Scala
spark.sql("CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table' TBLPROPERTIES ( 'hashPartitions'='id', 'hashBucketNum'='2')")
// python
spark.sql("CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table' TBLPROPERTIES ( 'hashPartitions'='id', 'hashBucketNum'='2')")
主键 CDC 表
哈希分区的LakeSoul表具有可选的数据变更捕获(CDC)功能,能够记录数据的变化。要创建支持CDC的LakeSoul表,可以在哈希分区表的DDL语句中添加额外的TBLPROPERTIES
设置,指定'lakesoul_cdc_change_column'
属性。这个属性定义了一个隐式列,帮助表有效地处理CDC信息,从而实现对数据变更的精确追踪和管理。
- Spark SQL
- Scala
- PySpark
CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, date STRING)
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table'
TBLPROPERTIES('hashPartitions'='id', 'hashBucketNum'='2', 'lakesoul_cdc_change_column' = 'op');
// Scala
spark.sql("CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table' TBLPROPERTIES('hashPartitions'='id', 'hashBucketNum'='2', 'lakesoul_cdc_change_column' = 'op')")
// python
spark.sql("CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table' TBLPROPERTIES('hashPartitions'='id', 'hashBucketNum'='2', 'lakesoul_cdc_change_column' = 'op')")
数据插入/合并
要使用Spark SQL向非哈希分区表写入数据,请使用INSERT INTO
语句。
要使用DataFrame向表写入数据,请 使用DataFrameWriterV2
API。如果这是对该表的第一次写入,它还将自动创建相应的LakeSoul表。
- Spark SQL
- Scala
- PySpark
INSERT INTO TABLE lakesoul_table VALUES (1, 'Alice', '2024-01-01'), (2, 'Bob', '2024-01-01'), (1, 'Cathy', '2024-01-02');
// Scala
val data = Seq(Row(1L, "Alice", "2024-01-01"), Row(2L, "Bob", "2024-01-01"), Row(1L, "Cathy", "2024-01-02"))
val schema = StructType(Seq(StructField("id", LongType, false), StructField("name", StringType, true), StructField("date", StringType, false)))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.write.format("lakesoul").insertInto("lakesoul_table")
// python
from pyspark.sql.types import *
data = [(1,"Cathy","2024-01-02")]
schema = StructType([StructField("id", LongType(), False), StructField("name", StringType(), True), StructField("date", StringType(), False)])
df = spark.createDataFrame(data,schema=schema)
df.write.format("lakesoul").insertInto("lakesoul_table")
要使用Spark SQL向哈希分区表写入数据,请使用Merge INTO
语句。
要使用DataFrame向哈希分区表写入数据,请使用LakeSoulTable
的upsert
API。
- Spark SQL
- Scala
- PySpark
CREATE OR REPLACE VIEW spark_catalog.default.source_view (id , name, date)
AS SELECT 1L as `id`, 'George' as `name`, '2024-01-01' as `date`;
MERGE INTO lakesoul_hash_table AS t
USING spark_catalog.default.source_view AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
// Scala
import com.dmetasoul.lakesoul.tables.LakeSoulTable
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
// Init hash table with first dataframe
val df = Seq((20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4)).toDF("range", "hash", "value")
val writer = df.write.format("lakesoul").mode("overwrite")
writer
.option("rangePartitions", "range")
.option("hashPartitions", "hash")
.option("hashBucketNum", 2)
.save(tablePath)
// merge the second dataframe into hash table using LakeSoulTable upsert API
val dfUpsert = Seq((20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4)).toDF("range", "hash", "value")
LakeSoulTable.forPath(tablePath).upsert(dfUpsert)
// python
from pyspark.sql.types import *
from tables import LakeSoulTable
tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
df = spark.createDataFrame([(20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4)],schema='range string,hash string,value string')
df.write.format("lakesoul").mode("overwrite").option("rangePartitions", "range").option("hashPartitions", "hash").option("hashBucketNum", 2).save(tablePath)
dfUpsert = spark.createDataFrame([(20201111, 1, 1), (20201111, 2, 2), (20201111, 3, 3), (20201112, 4, 4)],schema='range string,hash string,value string')
LakeSoulTable.forPath(spark,tablePath).upsert(dfUpsert)
数据更新
LakeSoul表可以通过DataFrame或使用标准的UPDATE
语句进行更新。要使用DataFrame更新表中的数据,请使用LakeSoulTable
的updateExpr
API。
- Spark SQL
- Scala
- PySpark
UPDATE lakesoul_table SET name = 'David' WHERE id = 2;
// Scala
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_table"
LakeSoulTable.forPath(tablePath).updateExpr("id = 2", Seq(("name"->"'David'")).toMap)
// python
from tables import LakeSoulTable
tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
LakeSoulTable.forPath(spark,tablePath).update("hash = 4", { "value":"5"})
数据删除
LakeSoul表可以通过DataFrame或使用标准的DELETE
语句来删除记录。要使用DataFrame从表中删除数据,请使用LakeSoulTable
的delete
API。
- Spark SQL
- Scala
- PySpark
DELETE FROM lakesoul_table WHERE id =1;
// Scala
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_table"
LakeSoulTable.forPath(tablePath).delete("id = 1 or id =2")
// python
from tables import LakeSoulTable
tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
LakeSoulTable.forPath(spark,tablePath).delete("hash = 4")
数据查 询
LakeSoul表可以使用DataFrame或Spark SQL进行查询。
- Spark SQL
- Scala
- PySpark
SELECT * FROM lakesoul_table;
// Scala
// query data with DataFrameReader API
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_table"
spark.read.format("lakesoul").load(tablePath)
// query data with LakeSoulTable API
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_table"
LakeSoulTable.forPath(tablePath).toDF
val tableName = "lakesoul_table"
LakeSoulTable.forName(tableName).toDF
// python
from tables import LakeSoulTable
tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
// query data with LakeSoulTable API
LakeSoulTable.forPath(spark,tablePath).toDF().show()
// query data with DataFrameReader API
spark.read.format("lakesoul").load(tablePath).show()
Time Travel查询
LakeSoul支持Time Travel查询,可以查询历史上任何时间点的表或两个提交时间之间的更改数据。
// Scala
val tablePath = "file:/tmp/lakesoul_namespace/cdc_table"
Seq(("range1", "hash1", "insert"), ("range2", "hash2", "insert"), ("range3", "hash2", "insert"), ("range4", "hash2", "insert"), ("range4", "hash4", "insert"), ("range3", "hash3", "insert"))
.toDF("range", "hash", "op")
.write
.mode("append")
.format("lakesoul")
.option("rangePartitions", "range")
.option("hashPartitions", "hash")
.option("hashBucketNum", "2")
.option("shortTableName", "cdc_table")
.option("lakesoul_cdc_change_column", "op")
.save(tablePath)
// record the version of 1st commit
import java.text.SimpleDateFormat
val versionA: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)
val lakeTable = LakeSoulTable.forPath(tablePath)
lakeTable.upsert(Seq(("range1", "hash1-1", "delete"), ("range2", "hash2-10", "delete"))
.toDF("range", "hash", "op"))
// record the version of 2nd commit
val versionB: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)
lakeTable.upsert(Seq(("range1", "hash1-13", "insert"), ("range2", "hash2-13", "update"))
.toDF("range", "hash", "op"))
lakeTable.upsert(Seq(("range1", "hash1-15", "insert"), ("range2", "hash2-15", "update"))
.toDF("range", "hash", "op"))
// record the version of 3rd,4th commits
val versionC: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)
全量查询
// Scala
spark.sql("SELECT * FROM cdc_table")
快照查询
LakeSoul支持快照查询,可用于查询历史上某一时间点的表数据。
// Scala
spark.read.format("lakesoul")
.option(LakeSoulOptions.PARTITION_DESC, "range=range2")
.option(LakeSoulOptions.READ_END_TIME, versionB)
.option(LakeSoulOptions.READ_TYPE, LakeSoulOptions.ReadType.SNAPSHOT_READ)
.load(tablePath)
增量查询
LakeSoul支持增量查询,可获得在起始时间和结束时间之间发生更改的数据记录。
// Scala
spark.read.format("lakesoul")
.option(LakeSoulOptions.PARTITION_DESC, "range=range1")
.option(LakeSoulOptions.READ_START_TIME, versionA)
.option(LakeSoulOptions.READ_END_TIME, versionB)
.option(LakeSoulOptions.READ_TYPE, LakeSoulOptions.ReadType.INCREMENTAL_READ)
.load(tablePath)
更多案例
接下来,您可以在Spark API文档中了解更多关于在Spark中使用LakeSoul表的案例。